/**
 *
 */
package pers.vic.test.redis.subscribe;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import pers.vic.boot.util.redis.JedisUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
 *  @description: REDIS订阅的管理类， 对于注册到spring的全部的RedisSubscribeBase实现类，进行订阅的初始化，
 *  <p>
 *  	主要是为了避免开启多个阻塞线程；，使得一个项目只开启一个阻塞线程订阅所有RedisSubscribeBase现实类中定义的通道
 *  然后通过RedisSubPubManager回调；<br/>
 *
 *  另外，RedisSubscribeI中发布的信息(message)为String, 因为本人以为订阅发布只应传递简单的数据，若一定需要传递复杂参数，可自行序列化/反序列化;<br/>
 *
 *   <h3>在项目中新增订阅和发布的方式使用方式：</h3>
 *   继承RedisSubscribeI类，实现相关抽象方法，并注册到spring即可；
 *  </p>
 *  @author Vic.xu
 *  @date: 2020年9月28日上午10:11:40
 */
@Service
public class RedisSubPubManager {

    private static Logger logger = LoggerFactory.getLogger(RedisSubPubManager.class);

    private static Jedis jedis;

    /**
     * 全部的订阅的service
     */
    @Autowired
    private Collection<BaseRedisSubscribe> redisPubSubs;

    //由redisPubSubs 转化而来的
    public MultiValueMap<String, BaseRedisSubscribe> subscribeMap = new LinkedMultiValueMap<String, BaseRedisSubscribe>();

    @PostConstruct
    public void post() {
        if (CollectionUtils.isEmpty(redisPubSubs)) {
            logger.info("本项目没有订阅任何redis通道");
            return;
        }
        jedis = JedisUtil.getResource();
        initsubscribe();

        subscribe();
        logger.info("redis订阅发布管理器RedisSubPubManager初始化成功");
    }


    /**
     * 开启订阅线程
     */
    public void subscribe() {
        Set<String> channelSet = subscribeMap.keySet();
        String[] channels = channelSet.toArray(new String[0]);
        logger.info("本项目中包含的redis订阅的channgers：[{}]", StringUtils.join(channels, "; "));
        new Thread(new Runnable() {
            @Override
            public void run() {
                jedis.subscribe(new JedisPubSub() {
                    @Override
                    public void onMessage(String channel, String message) {
                        List<BaseRedisSubscribe> list = subscribeMap.get(channel);
                        if (CollectionUtils.isEmpty(list)) {
                            logger.info("没有找到通道[{}]的消费者", channel);
                            return;
                        }
                        //订阅回调
                        for (BaseRedisSubscribe subscribeI : list) {
                            subscribeI.onMessage(message);
                        }
                    }
                }, channels);
            }
        }).start();
        ;
    }

    /**
     * 把订阅service集合转为map; 允许一个通道被多个service订阅
     */
    public void initsubscribe() {
        for (BaseRedisSubscribe subscriber : redisPubSubs) {
            String channelname = subscriber.getChannel();
            Assert.notNull(channelname, subscriber.getClass().getSimpleName() + " 没有配置订阅的通道");
            subscribeMap.add(channelname, subscriber);
        }
    }

    @PreDestroy
    public void destroy() {
        jedis.close();
    }

}



